[アップデート] Amazon OpenSearch Ingestion が Amazon Kinesis Data Streams からのデータの取り込みをサポートしました
リテールアプリ共創部の中野です。
Amazon OpenSearch Ingestion が Amazon Kinesis Data Streams からのデータ取り込みをサポートするようになったというアップデートが公開されました。
OpenSearch Integration は、OpenSearch Service ドメインへデータの抽出、変換、取り込みを一括で簡単に行えるような機能をサポートしています。
以前から S3 や Kinesis Firehose、DynamoDB などのサービスからの取り込みをサポートしていました。
このアップデートによって、Kinesis Data Streams 経由で大量にストリーミングされるデータをリアルタイム分析したいようなケースでの活用が可能になっています。
アプリケーションのログを OpenSearch でリアルタイム分析したいなどの用途で使えそうですね。
やってみた
0. 全体構成
以下のような構成で OpenSearch Service へ Lambda のアプリケーションログを書き込むような状況を作ってみます。
この構成で OpenSearch Dashboard でログを確認できるかみてみましょう。
Lambda から出力されたアプリケーションログを CloudWatch Logs へ書き込みます。
その後、CloudWatch Logs のサブスクリプションフィルターで Kinesis Data Streams へログデータを送信します。
その後、OpenSearch Integration が Kinesis Data Streams のストリーミングデータを消費して、OpenSearch のインデックスへ書き込みを行います。
最終的にユーザーが OpenSearch のデータを視覚的に確認したい場合は、EC2 の踏み台サーバー経由でセッションマネージャーを利用してアクセスすることで確認できます。
なお、OpenSearch Service のドメインを作成するために手動だとかなり時間がかかってしまいます。
そこで、以下の AWS CDK のサンプルを使わせていただいて、事前に環境を用意しました。
1. Lambda の用意
アプリケーションのログを確認できるように Lambda を準備します。
Lambda のランタイムはなんでもよいですが、今回は Node.js を使いました。
また、適当なログを書き込むために以下のようなソースを用意します。
export const handler = async (event) => {
console.error("エラーです");
return "エラーです";
};
2. Kinesis Data Streams の構築
次に、ログをストリーミングする Kinesis Data Streams をつくります。
オンデマンドモードで作成してシャードを自動スケールするようにしておきます。
3. CloudWatch Logs Subscription Filter の設定
サブスクリプションフィルターを作成する前に、事前に CloudWatch Logs から Kinesis Data Streams へ PutRecord を許可する IAM ロールを作成します。
基本的には以下の公式ドキュメントを参考にしました。
IAM ロールの信頼ポリシーは以下のようにしました。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "logs.ap-northeast-1.amazonaws.com"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringLike": {
"aws:SourceArn": "arn:aws:logs:ap-northeast-1:<account-id>:*"
}
}
}
]
}
IAM ポリシーは以下です。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "kinesis:PutRecord",
"Resource": "arn:aws:kinesis:ap-northeast-1:<account-id>:stream/kinesis-data-streams-to-open-search"
}
]
}
次に、Lambda から書き込まれたログのサブスクリプションフィルターを設定します。
ロググループ単位でのフィルターを作成します。
アクセス許可を付与する設定で、さきほど作成した IAM ロールをアタッチします。
ログ形式はその他にしてそのまま作成します。
ここまでで、Lambda から生成されたログを Kinesis Data Streams へ書き込みできるようになったので、テスト実行してみます。
Lambda のテスト実行を行って書き込みを行われていることを確認します。
データビューアーのタブから適当なシャードを選択して、開始位置を水平トリムにしてレコードを取得します。
パーティションキー毎にバイナリデータが書き込まれていれば準備完了です。
ここで、バイナリデータになっているのは gzip 形式で書き込みが行われているためです。
4. OpenSearch ドメインの構築
それでは、OpenSearch ドメインを構築してきます。
OpenSearch 自体はイチから作成すると設定を選んだり構築完了まで待つのに 20〜30 分程度かかってしまうため、前述で触れた AWS CDK のサンプルコードを利用させていただいて構築します。
package.json のモジュールバージョンは若干古かったため以下を利用しました。
{
"name": "opensearch-vpc-cdk",
"version": "0.1.0",
"bin": {
"opensearch-vpc-cdk": "bin/opensearch-vpc-cdk.js"
},
"scripts": {
"build": "tsc",
"watch": "tsc -w",
"test": "jest",
"cdk": "cdk"
},
"devDependencies": {
"@types/jest": "^29.5.14",
"@types/node": "22.9.0",
"aws-cdk": "2.167.1",
"jest": "^29.7.0",
"ts-jest": "^29.2.5",
"ts-node": "^10.9.2",
"typescript": "~5.6.3"
},
"dependencies": {
"@aws-cdk/aws-lambda-python-alpha": "^2.167.1-alpha.0",
"@aws-sdk/client-iam": "^3.693.0",
"aws-cdk-lib": "2.167.1",
"constructs": "^10.4.2",
"source-map-support": "^0.5.21"
}
}
AWS CDK のコードは以下の通りです。
#!/usr/bin/env node
import "source-map-support/register";
import * as cdk from "aws-cdk-lib";
import { OpensearchVpcCdkStack } from "../lib/opensearch-vpc-cdk-stack";
const app = new cdk.App();
new OpensearchVpcCdkStack(app, "OpensearchVpcCdkStack", {});
Lambda のインデクシングするコードは検証で不要でしたので削除しました。
OpenSearch のエンジンバージョンも古かったため、現状の最新である v2.15 を指定しました。
import * as cdk from "aws-cdk-lib";
import { CfnOutput, RemovalPolicy } from "aws-cdk-lib";
import {
BastionHostLinux,
BlockDeviceVolume,
MachineImage,
Peer,
Port,
SecurityGroup,
Vpc,
} from "aws-cdk-lib/aws-ec2";
import {
AnyPrincipal,
CfnServiceLinkedRole,
PolicyStatement,
} from "aws-cdk-lib/aws-iam";
import { Domain, EngineVersion } from "aws-cdk-lib/aws-opensearchservice";
import { Construct } from "constructs";
import { IAMClient, ListRolesCommand } from "@aws-sdk/client-iam";
const iam = new IAMClient({});
export class OpensearchVpcCdkStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// VPC
const vpc = new Vpc(this, "Vpc", {});
// Security Group
const bastionSecurityGroup = new SecurityGroup(
this,
"BastionSecurityGroup",
{
vpc: vpc,
allowAllOutbound: true,
securityGroupName: "BastionSecurityGroup",
}
);
const opensearchSecurityGroup = new SecurityGroup(
this,
"OpensearchSecurityGroup",
{
vpc: vpc,
securityGroupName: "OpensearchSecurityGroup",
}
);
opensearchSecurityGroup.addIngressRule(bastionSecurityGroup, Port.tcp(443));
// Service-linked role that Amazon OpenSearch Service will use
(async () => {
const response = await iam.send(
new ListRolesCommand({
PathPrefix: "/aws-service-role/opensearchservice.amazonaws.com/",
})
);
// Only if the role for OpenSearch Service doesn't exist, it will be created.
if (response.Roles && response.Roles?.length == 0) {
new CfnServiceLinkedRole(this, "OpensearchServiceLinkedRole", {
awsServiceName: "es.amazonaws.com",
});
}
})();
// Bastion host to access Opensearch Dashboards
new BastionHostLinux(this, "BastionHost", {
vpc,
securityGroup: bastionSecurityGroup,
machineImage: MachineImage.latestAmazonLinux2023(),
blockDevices: [
{
deviceName: "/dev/xvda",
volume: BlockDeviceVolume.ebs(10, {
encrypted: true,
}),
},
],
});
// OpenSearch domain
const domain = new Domain(this, "Domain", {
version: EngineVersion.OPENSEARCH_2_15,
nodeToNodeEncryption: true,
enforceHttps: true,
encryptionAtRest: {
enabled: true,
},
vpc: vpc,
capacity: {
dataNodes: 2,
},
removalPolicy: RemovalPolicy.DESTROY,
zoneAwareness: {
enabled: true,
},
securityGroups: [opensearchSecurityGroup],
});
domain.addAccessPolicies(
new PolicyStatement({
principals: [new AnyPrincipal()],
actions: ["es:ESHttp*"],
resources: [domain.domainArn + "/*"],
})
);
}
}
モジュールをインストールしてデプロイします。
npm ci
npx cdk bootstrap
npx cdk deploy
OpenSearch ドメインが構築されました。
セッションマネージャーで自身のローカル環境から OpenSearch Dashboard にアクセスできますので確認します。
事前にセッションマネージャープラグインも必要なので、ローカルにインストールしていない場合はインストールします。
--target 部分には踏み台用 EC2 インスタンスのインスタンス ID を入力します。
--parameters の host には OpenSearch の VPC ドメインエンドポイントをいれますが、https
は含めずに FQDN のみ入力してください。
aws ssm start-session --target <EC2のインスタンスID> --document-name AWS-StartPortForwardingSessionToRemoteHost --parameters '{"portNumber":["443"],"localPortNumber":["8157"], "host":["*******.ap-northeast-1.es.amazonaws.com"]}'
Starting session with SessionId: **************
Port 8157 opened for sessionId **************
Waiting for connections...
Waiting for connections
という文字がでていればセッションとしては確立されているため OpenSearch Dashboard にアクセスできます。
ブラウザから、https://localhost:8157/_dashboards
にアクセスして以下のような画面がでれば OK です。
5. OpenSearch Integration のパイプライン構築
最後に、Kinesis Data Streams のデータを取り込んで OpenSearch Service へデータを変換してロードするパイプラインを構築します。
事前に IAM ロールを作成します。
IAM ロールの信頼ポリシーは以下です。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "osis-pipelines.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
一方で、IAM ポリシーは以下のように定義します。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "allowReadFromStream",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:DescribeStreamConsumer",
"kinesis:DescribeStreamSummary",
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:ListShards",
"kinesis:ListStreams",
"kinesis:ListStreamConsumers",
"kinesis:RegisterStreamConsumer",
"kinesis:SubscribeToShard"
],
"Resource": ["arn:aws:kinesis:ap-northeast-1:<account-id>:stream/*"]
},
{
"Sid": "allowAccessToOS",
"Effect": "Allow",
"Action": ["es:DescribeDomain", "es:ESHttp*"],
"Resource": [
"arn:aws:es:ap-northeast-1:<account-id>:domain/domain66ac69e0-xwfaqbkvba0m",
"arn:aws:es:ap-northeast-1:<account-id>:domain/domain66ac69e0-xwfaqbkvba0m/*"
]
}
]
}
この IAM ポリシーによって、OpenSearch Integration から Kinesis Data Streams への読み込みアクセスと OpenSearch Integration から OpenSearch ドメインへの読み込み書き込みアクセスが許可されます。
IAM ロールを作成できたら、Integration パイプラインを構築します。
パイプラインの構成で、設定を書いていきます。
注意点として、IAM ロールは、OpenSearch Service の sink 定義と Kinesis Data Streams の source 定義の両方で同じである必要があります。
また、processor でデータの変換定義をしています。
CloudWatch のロググループ名を使用してさまざまなログタイプのデータセットとして識別して OpenSearch のインデックスにドキュメントを書き込みます。
add_entries には、ロググループ名や Kinesis のデータストリーム名などのメタデータを付与できるように指定しています。
version: "2"
kinesis-pipeline:
source:
kinesis-data-streams:
acknowledgments: true
codec:
# Kinesisレコードが集約されているかどうかに基づいて、json、newline、またはndjsonコーデックを選択できます。
# JSONコーデックは、ネストされたCloudWatchイベントを個々のログエントリに解析し、OpenSearchにドキュメントとして書き込みます。
json:
key_name: "logEvents"
# これらのキーには、CloudWatchサブスクリプションフィルターによって送信されたメタデータが含まれています。
# 個々のログイベントに加えて:
# https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#DestinationKinesisExample
# include_keys: [ 'owner', 'logGroup', 'logStream' ]
newline:
streams:
- stream_name: "kinesis-data-streams-to-open-search"
# ストリームの開始から取り込みを開始する場合はこれを有効にします。
initial_position: "EARLIEST"
# checkpoint_interval: "PT5M"
# CloudWatchの場合、圧縮は常にgzipになりますが、他のソースの場合は異なります:
compression: "gzip"
aws:
# KDSへのアクセスを持つRole ARNを提供します。このロールはosis-pipelines.amazonaws.comとの信頼関係を持つ必要があります。
sts_role_arn: "arn:aws:iam::<account-id>:role/kds-to-es-integration-pipeline-role"
# データストリームのリージョンを提供します。
region: "ap-northeast-1"
processor:
- rename_keys:
entries:
# CloudWatchのタイムスタンプを観測タイムスタンプとして含めます - ログが生成され、CloudWatchに送信された時間:
- from_key: "timestamp"
to_key: "observed_timestamp"
- date:
# OSIがログイベントを処理した現在のタイムスタンプを含めます:
from_time_received: true
destination: "processed_timestamp"
- add_entries:
entries:
# SS4O共通ログフィールドをサポートします (https://opensearch.org/docs/latest/observing-your-data/ss4o/)
- key: "cloud/provider"
value: "aws"
- key: "cloud/account/id"
format: "${owner}"
- key: "cloud/region"
value: "ap-northeast-1"
- key: "aws/cloudwatch/log_group"
format: "${logGroup}"
- key: "aws/cloudwatch/log_stream"
format: "${logStream}"
# data_streamのデフォルト値を含めます:
- key: "data_stream/namespace"
value: "default"
- key: "data_stream/type"
value: "logs"
- key: "data_stream/dataset"
value: "general"
# このログイベントを含むソースKinesisメッセージに関するメタデータを含めます:
- key: "aws/kinesis/stream_name"
value_expression: 'getMetadata("stream_name")'
- key: "aws/kinesis/partition_key"
value_expression: 'getMetadata("partition_key")'
- key: "aws/kinesis/sequence_number"
value_expression: 'getMetadata("sequence_number")'
- key: "aws/kinesis/sub_sequence_number"
value_expression: 'getMetadata("sub_sequence_number")'
- add_entries:
entries:
# ログイベントのコンテキストに基づいてdata_streamフィールドを更新します - この場合、ソース(CloudTrailまたはLambda)によってログイベントを分類します。
# ビジネスまたはアプリケーションコンテキストによってログを分類するための追加のロジックを追加できます:
- key: "data_stream/dataset"
value: "cloudtrail"
add_when: 'contains(/logGroup, "cloudtrail") or contains(/logGroup, "CloudTrail")'
overwrite_if_key_exists: true
- key: "data_stream/dataset"
value: "lambda"
add_when: 'contains(/logGroup, "/aws/lambda/")'
overwrite_if_key_exists: true
- key: "data_stream/dataset"
value: "apache"
add_when: 'contains(/logGroup, "/apache/")'
overwrite_if_key_exists: true
# デフォルトのCloudWatchフィールドを削除します。これらはSS4Oフィールドに再マッピングされました:
- delete_entries:
with_keys:
- "logGroup"
- "logStream"
- "owner"
# 非JSONのapacheログを解析するためにGrokパーサーを使用します
- grok:
grok_when: '/data_stream/dataset == "apache"'
match:
message: ["%{COMMONAPACHELOG_DATATYPED}"]
target_key: "http"
# OpenSearchインデックスでのフィールドレベルの検索をサポートするために、ログデータをJSONとして解析しようとします:
- parse_json:
# ルートメッセージオブジェクトをaws.cloudtrailに解析して、SS4OログのSS4O標準に一致させます
source: "message"
destination: "aws/cloudtrail"
parse_when: '/data_stream/dataset == "cloudtrail"'
tags_on_failure: ["json_parse_fail"]
- parse_json:
# Lambda関数ログの場合、可能であればルートメッセージオブジェクトをJSONとして解析します - 非JSONのログ関数データを検索可能なフィールドとしてキャプチャするためにGrokサポートを設定することもできます
source: "message"
destination: "aws/lambda"
parse_when: '/data_stream/dataset == "lambda"'
tags_on_failure: ["json_parse_fail"]
- parse_json:
# 一般ログの場合、可能であればルートメッセージオブジェクトをJSONとして解析します
source: "message"
destination: "body"
parse_when: '/data_stream/dataset == "general"'
tags_on_failure: ["json_parse_fail"]
sink:
- opensearch:
# AWS OpenSearch Serviceドメインエンドポイントを提供します
hosts: ["https://****************.ap-northeast-1.es.amazonaws.com"]
# ログデータをログコンテキストに応じて異なるターゲットインデックスにルーティングします:
index: "ss4o_${data_stream/type}-${data_stream/dataset}-${data_stream/namespace}"
aws:
# ドメインへのアクセスを持つRole ARNを提供します。このロールはosis-pipelines.amazonaws.comとの信頼関係を持つ必要があります。
# このロールは、上記のKinesisで使用したロールと同じでなければなりません。
sts_role_arn: "arn:aws:iam::<account-id>:role/kds-to-es-integration-pipeline-role"
# ドメインのリージョンを提供します。
region: "ap-northeast-1"
# シンクがAmazon OpenSearch Serverlessコレクションである場合、'serverless'フラグを有効にします
serverless: false
上記の設定を書き込んで検証ボタンがあるので教えて構文が問題ないか確認します。
OpenSearch ドメインが VPC 内にあるリソースのため、VPC のリソースへ接続できるように VPC アクセスを設定します。
問題が発生したときにトラブルシューティングしやすいように、パイプライン用の CloudWatch ログ記録を設定しておきます。
ステータスがアクティブになれば、パイプラインの構築は完了です。
6. OpenSearch Dashboard を見てみた
これで全体構成のすべてのリソースの準備が完了しました。
Lambda のマネージメントコンソールからテスト実行を使って複数回実行して強制出力させます。
しばらく経つと、OpenSearch Dashboard の Discover から Index を作成できるようになるので作成します。
以下のように 8 件のログが OpenSearch Dashboard 上で見られるようになってました。
7. (オプション) 後片付け
検証用途の場合、そのままにしていると OpenSearch や Kinesis Data Streams、踏み台の EC2 インスタンスで高額の料金がかかってしまうため、削除しておきましょう。
- OpenSearch Service と踏み台 EC2 の削除(
npx cdk destroy
) - OpenSearch Integration の削除
- Kinesis Data Streams の削除
さいごに
Amazon OpenSearch Ingestion が Amazon Kinesis Data Streams からのデータ取り込みが新たにサポートされるようになりました。
これまでは、Kinesis Data Streams のデータを OpenSearch へ投入するには、Lambda で OpenSearch へ書き込みに行くか、Firehose を経由して OpenSearch へ書き込む方法などがあげられます。
しかし、アーキテクチャが複雑になったり、コード書いて保守する必要があったり、複数のリソースを経由することでオーバーヘッドの増加が考えられます。
今回のアップデートで、OpenSearch Integration に直接 Kinesis Data Streams のデータを連携させられるため、運用コストが削減されるメリットや、よりリアルタイムでのデータ連携や分析が可能になったかと思います。
re:Invent 2024 が近づいていることもあり、Kinesis や OpenSearch Service の新たなアップデートも気になります。
この記事が、誰かのお役に立てれば。